-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix dataloss issue in restarting-streams-at-rebalancing mode #473
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I wonder if we could simplify a bit by not buffering at all but immediately pushing the 'to be buffered' records on the partition stream with this mechanism.
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord | ||
import zio.stream.Take | ||
|
||
case class PartitionStreamControl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be private
?
Sounds like a good idea but:
|
Note: I'm still trying to validate this in production and seeing some problems (not sure yet if related) |
The problem was that With this change from my end the change is ready to merge |
Note: one more fix is required for this, I'm validating it in prod now. |
I think the fix is now complete, I plan to publish a detailed blog post about it that will help understand. |
Detailed explanation: https://ziverge.com/blog/zio-kafka-with-transactions-debugging-story |
Fixes #469